dec70940e91a8308c0c842d67275e1e9c8ce382b,PolygonRDD.java,PolygonRDD,SpatialJoinQuery,#PolygonRDD#number#number#number#,80
Before Change
JavaPairRDD<Integer,Polygon> TargetSetWithID=this.polygonRDD.mapPartitionsToPair(new PartitionAssignGridPolygon(GridNumberHorizontal,GridNumberVertical,gridHorizontalBorder,gridVerticalBorder));
JavaPairRDD<Integer,Polygon> QueryAreaSetWithID=polygonRDD.getPolygonRDD().mapPartitionsToPair(new PartitionAssignGridPolygon(GridNumberHorizontal,GridNumberVertical,gridHorizontalBorder,gridVerticalBorder));
//Join two dataset
JavaPairRDD<Integer, Tuple2<Iterable<Polygon>, Iterable<Polygon>>> jointSet=QueryAreaSetWithID.cogroup(TargetSetWithID);
//Calculate the relation between one point and one query area
JavaPairRDD<Polygon,String> result=jointSet.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer,Tuple2<Iterable<Polygon>, Iterable<Polygon>>>, Polygon,String>()
{
public Iterable<Tuple2<Polygon, String>> call(
Tuple2<Integer, Tuple2<Iterable<Polygon>, Iterable<Polygon>>> t)
throws Exception {
ArrayList<Tuple2<Polygon, String>> QueryAreaAndPoint=new ArrayList();
Iterator<Polygon> QueryAreaIterator=t._2()._1().iterator();
while(QueryAreaIterator.hasNext())
{
Polygon currentQueryArea=QueryAreaIterator.next();
String QueryArea="";
Iterator<Polygon> TargetIterator=t._2()._2().iterator();
while(TargetIterator.hasNext())
{
Polygon currentTarget=TargetIterator.next();
if(condition==0){
if(currentQueryArea.contains(currentTarget))
{
QueryArea=QueryArea+"|"+currentTarget.getCoordinates()+"|";
}
}
else
{
if(currentQueryArea.intersects(currentTarget))
{
QueryArea=QueryArea+"|"+currentTarget.getCoordinates()+"|";
}
}
}
QueryAreaAndPoint.add(new Tuple2<Polygon, String>(currentQueryArea,QueryArea));
}
return QueryAreaAndPoint;
}
});
//Delete the duplicate result
JavaPairRDD<Polygon,String> refinedResult=result.reduceByKey(new Function2<String,String,String>(){
public String call(String v1, String v2) throws Exception {
if(v1=="" && v2!="")
{
return v2;
}
else if(v1!="" && v2=="")
{
return v1;
}
else if(v1!="" && v2!="")
{
return v1+""+v2;
}
else
{
return "";
}
}});
//Persist the result on HDFS
//refinedResult.repartition(1).saveAsTextFile(OutputLocation);
return refinedResult;
After Change
JavaPairRDD<Integer,Polygon> TargetSetWithID=this.polygonRDD.mapPartitionsToPair(new PartitionAssignGridPolygon(GridNumberHorizontal,GridNumberVertical,gridHorizontalBorder,gridVerticalBorder));
JavaPairRDD<Integer,Polygon> QueryAreaSetWithID=polygonRDD.getPolygonRDD().mapPartitionsToPair(new PartitionAssignGridPolygon(GridNumberHorizontal,GridNumberVertical,gridHorizontalBorder,gridVerticalBorder));
//Join two dataset
JavaPairRDD<Integer, Tuple2<Iterable<Polygon>, Iterable<Polygon>>> jointSet=QueryAreaSetWithID.cogroup(TargetSetWithID).repartition((QueryAreaSetWithID.partitions().size()+TargetSetWithID.partitions().size())*2);
//Calculate the relation between one point and one query area
JavaPairRDD<Polygon,Polygon> queryResult=jointSet.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer,Tuple2<Iterable<Polygon>, Iterable<Polygon>>>, Polygon,Polygon>()
{
public Iterable<Tuple2<Polygon, Polygon>> call(
Tuple2<Integer, Tuple2<Iterable<Polygon>, Iterable<Polygon>>> t)
throws Exception {
ArrayList<Tuple2<Polygon, Polygon>> QueryAreaAndTarget=new ArrayList();
Iterator<Polygon> QueryAreaIterator=t._2()._1().iterator();
while(QueryAreaIterator.hasNext())
{
Polygon currentQueryArea=QueryAreaIterator.next();
Iterator<Polygon> TargetIterator=t._2()._2().iterator();
while(TargetIterator.hasNext())
{
Polygon currentTarget=TargetIterator.next();
if(condition==0){
if(currentQueryArea.contains(currentTarget))
{
QueryAreaAndTarget.add(new Tuple2<Polygon,Polygon>(currentQueryArea,currentTarget));
}
}
else
{
if(currentQueryArea.intersects(currentTarget))
{
QueryAreaAndTarget.add(new Tuple2<Polygon,Polygon>(currentQueryArea,currentTarget));
}
}
}
}
return QueryAreaAndTarget;
}
});
//Delete the duplicate result
JavaPairRDD<Polygon, Iterable<Polygon>> aggregatedResult=queryResult.groupByKey();
JavaPairRDD<Polygon,String> refinedResult=aggregatedResult.mapToPair(new PairFunction<Tuple2<Polygon,Iterable<Polygon>>,Polygon,String>()
{
public Tuple2<Polygon, String> call(Tuple2<Polygon, Iterable<Polygon>> t)
{
Integer commaFlag=0;
Iterator<Polygon> valueIterator=t._2().iterator();
String result="";
while(valueIterator.hasNext())
{
Polygon currentTarget=valueIterator.next();
Coordinate[] polygonCoordinate=currentTarget.getCoordinates();
Integer count=polygonCoordinate.length;
String currentTargetString="";
for(int i=0;i<count;i++)
{
currentTargetString=currentTargetString+polygonCoordinate[i].x+","+polygonCoordinate[i].y;
}
if(!result.contains(currentTargetString))
{
if(commaFlag==0)
{
result=result+currentTargetString;
commaFlag=1;
}
else result=result+","+currentTargetString;
}
}
return new Tuple2<Polygon, String>(t._1(),result);
}
});
//Persist the result on HDFS
return refinedResult;